-
Notifications
You must be signed in to change notification settings - Fork 502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
services/horizon/internal/db2/history: Use FastBatchInsertBuilder to insert transactions into the history_transactions #4950
Conversation
386b660
to
4467f16
Compare
Table: q.GetTable("history_transactions"), | ||
MaxBatchSize: maxBatchSize, | ||
}, | ||
table: "history_transactions", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a constant available to reference instead of hardcode table name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no constant. I could create one but this would be the only place where it would get referenced.
} | ||
|
||
// transactionBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder | ||
type transactionBatchInsertBuilder struct { | ||
encodingBuffer *xdr.EncodingBuffer | ||
builder db.BatchInsertBuilder | ||
table string | ||
builder db.FastBatchInsertBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be worthwhile to store the query session instance in the builder instance state i.e. : q: &Q
, then don't have to pass it on TransactionBatchInsertBuilder.Exec(ctx context.Context) error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I don't include the session in the struct is because I want to make it explicit that only the Exec()
function is acting on the DB. The Add()
function is only accumulating rows in-memory. Eventually, we will refactor the ingestion processors so that the processors are not constructed with a db session and only the Commit()
function will take a db session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, understood. just a thought exercise, in a way, does that force sql/db persistence aspect onto the ingest lifecycle interfaces, i.e. what if the builders or processors were talking to a no-sql source instead, if they maintain reference to their persistence layer internally through a struct reference, then the ingest interface methods like Exec/Commit can be storage type agnostic, with no parameters referencing specific backend types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a good point. I'll keep that in mind when we implement #4909 . Perhaps it would be better to not have a session in the Commit()
functions.
tt.Assert.NoError(q.Begin()) | ||
insertBuilder = q.NewTransactionBatchInsertBuilder() | ||
tt.Assert.NoError(insertBuilder.Add(secondTransaction, sequence)) | ||
tt.Assert.Error(insertBuilder.Exec(tt.Ctx, q)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the duplicate error message/content no longer present in the error returned from copy execution? is there any other unique indicators in that error to capture? asking more-so for debugging this type of output in logs at runtime, if there were duplicate tx's attempted on insert, would there a be logged error content stating that aspect at some other level if not captured in the error from fast batch builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 4c07d3b
@@ -210,8 +213,10 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) { | |||
test.ResetHorizonDB(t, tt.HorizonDB) | |||
q := &Q{tt.HorizonSession()} | |||
|
|||
tt.Assert.NoError(q.Begin()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the significance of starting a db session transaction here rather than on ln 92, after the last db commit and before the same session gets used on liquidity pool query? Have noticed direct calls to q.commit()
in tests using the builder also, just want to understand the nuances with the new fast builder requiring that if so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Exec()
function needs to be invoked within a transaction:
The reason is that the lib/pq implementation of COPY does not allow you to execute COPY outside of a transaction
@@ -168,7 +168,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor(ledger xdr.LedgerHeaderHisto | |||
// when in online mode, the submission result processor must always run (regardless of filtering) | |||
var p []horizonTransactionProcessor | |||
if s.config.EnableIngestionFiltering { | |||
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ, uint32(ledger.Header.LedgerSeq)) | |||
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.session, s.historyQ, uint32(ledger.Header.LedgerSeq)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the significance between ProcessorRunner.session and ProcessorRunner.historyQ, as they are both set to the same db.SessionInterface
could this be reverted to just pass ProcessorRunner.historyQ for obtaining reference/access to the same pooled db session instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The historyQ value implements both interfaces in https://github.com/tamirms/go/blob/transactions-batch/services/horizon/internal/ingest/processor_runner.go#L91-L92. Note that history.IngestionQ
and db.SessionInterface
are distinct interfaces
Eventually we will refactor the ingestion processors so that they don't need the db instance except during the Commit()
phase (see #4909 ). When that PR is implemented we will remove the duplicate fields in ProcessorRunner and only have one db session field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, leave to your discretion on whether you feel they warrant changes or not. overall, looks good!
PR Checklist
PR Structure
otherwise).
services/friendbot
, orall
ordoc
if the changes are broad or impact manypackages.
Thoroughness
.md
files, etc... affected by this change). Take a look in the
docs
folder for a given service,like this one.
Release planning
needed with deprecations, added features, breaking changes, and DB schema changes.
semver, or if it's mainly a patch change. The PR is targeted at the next
release branch if it's not a patch change.
What
In #4916 we introduced a FastBatchInsertBuilder which can be used to quickly insert a large amount of rows into Postgres via the COPY command. In this PR, we use FastBatchInsertBuilder to insert transactions into the history_transactions table.
Why
As a part of #4908 , we want to update all the code which inserts data into the history tables to use the FastBatchInsertBuilder.
Known limitations
[N/A]